Programmable LED 〜Slackから点灯パターンを設定できるクリスマスツリー用のLEDデバイスを作ってみました〜
1 はじめに
CX 事業本部 delivery部の平内(SIN)です。
本記事は、100円ショップで販売されているテープライトを「クリスマスツリーで使って見たい!」ということで、簡単な工作をした記録です。
完全に個人的な趣味で申し訳ありません。少しでも興味が持てそうでしたら、続けて読んで頂ければ嬉しいです。
最初に完成したものを見てやってください。
Slackでパターンを投稿するとツリーの点灯が変化します。「職場のみんなで、色々なパターンを作って楽しめたらいいなぁ〜」という妄想が元となっています。
2 ハードウエア
(1) USBコネクタ
とりあえずUSBコネクタ(メス)が4つ欲しかったのですが、もう使っていない古いUSBハブがあったので、バラしました。
コネクタは表面実装でしたが、ハンドコテ2本で、全体的に溶かすことで、うまく外す事ができました。
(2) ユニバーサル基板への取り付け
表面実装の部品なため、ユニバーサル基板に差し込む足が無いので、ワイヤーで結んで半田で固めちゃいました。
+と-の端子にそのまま接続して、点灯できるかどうか確認している様子です。
(3) 組み立て
雑ですが、設計図です。基盤の裏から見た絵を書いてみて、そのまま作ってます。
トランジスタは、手元に残っていた、2SC1815を使用しました。
電流制限とプルダウンは、不要になった機器から外したチップ抵抗です。作業中に過って紛失したりしても、いっぱいあるので、全然余裕です。
チップ部品のハンダ付けは、マイクロスコープが有れば、老眼でも大丈です。
とりあえず、動作確認しながら、1つ作って行きました。
(4) ケース
ケース用に塩ビ板を買ってきました。
ユニバーサル基板の上下に配置するために同じサイズで2枚カットしてます。
ユニバーサル基盤に合わせて四隅に穴を開けました。
後は、なんとかサイズの合いそうなネジを探して組み立ててます。ネジは、過去に、趣味で家電などを分解して集めたものです。
ケースといいながら、基盤への接触を避けるため、上下に塩ビ板を付けただけです。
3 ESP32
作成したデバイスのロジックのON/OFFは、ESP32で行っています。併せて、MQTTで点灯パターンの指示を受けるようになっています。
(1) MicroPython
ESP32は、MicroPythonで使用していますが、初期化や利用方法については、下記で紹介しているものと同じです。
(2) 接続
ESP32と作成したハードウエアを接続した様子です。
下記のGPIOが、USB端子のロジック(L1〜L4)に接続されています。
GPIO12 L1 WHITE GPIO14 L2 RED GPIO27 L3 YELLOW GPIO26 L4 BLUE
(3) コード
ESP32で動作しているコードです。
pattern_list[]が、4つのUSBコネクタの電源を0.1秒単位でON/OFFする 0,1 のパターンです。
MQTTで、AWS IoT Coreに接続し、「christmas-tree/デバイス番号」をSubscribeしています。 メッセージは、下記のような形式となっており、受け取った時点で、pattern_list[]を書き換えます。
各色のパターンの長さは、特に制限がありません。一番長いパターンに併せて、足りない分は0で埋められます。
payload = { "white" : "000000000000000000000000001010100000", "red" : "0000110011000011000000001100000000", "blue" : "1010001000101000101010100000000000", "yellow" : "000000000000000000000000001010100000" }
main.py
import os import time import json import machine import network from simple import MQTTClient state = "OK" wifi_ssid = "XXXX" wifi_password = "XXXXXXXX" aws_endpoint = b"xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" private_key = "private.pem.key" private_cert = "cert.pem.crt" pattern_list = ["1111100000", "1111100000", "1111100000", "1111100000"] with open(private_key, "r") as f: key = f.read() with open(private_cert, "r") as f: cert = f.read() device_id = "0001" topic_sub = "christmas-tree/" + device_id ssl_params = {"key": key, "cert": cert, "server_side": False} led_list = [] led_list.append(machine.Pin(12, machine.Pin.OUT)) # GPIO12 L1 WHITE led_list.append(machine.Pin(14, machine.Pin.OUT)) # GPIO14 L2 RED led_list.append(machine.Pin(27, machine.Pin.OUT)) # GPIO27 L3 YELLOW led_list.append(machine.Pin(26, machine.Pin.OUT)) # GPIO26 L4 BLUE wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print("Connecting to network...") wlan.connect(wifi_ssid, wifi_password) while not wlan.isconnected(): pass print("Connection successful") print("Network config:", wlan.ifconfig()) def mqtt_connect(client=device_id, endpoint=aws_endpoint, sslp=ssl_params): mqtt = MQTTClient( client_id=client, server=endpoint, port=8883, keepalive=1200, ssl=True, ssl_params=sslp, ) print("Connecting to AWS IoT...") mqtt.connect() print("Connected.") return mqtt def mqtt_subscribe(topic, msg): print("on received...") print("msg.decode(): {}".format(msg.decode())) global pattern_list for i, color in enumerate(["white", "red", "yellow", "blue"]): pattern_list[i] = json.loads(msg.decode())[color] # 一番長いパターンに合わせるため、不足分に"0"を追加する length_list = [] for i in range(len(pattern_list)): length_list.append(len(pattern_list[i])) max_length = max(length_list) print("length_list:{} max_length: {}".format(length_list, max_length)) for i in range(len(pattern_list)): for _ in range(max_length - length_list[i]): pattern_list[i] = pattern_list[i] + "0" for i in range(len(pattern_list)): print("pattern_list[{}]: {}".format(i, pattern_list[i])) try: mqtt = mqtt_connect() mqtt.set_callback(mqtt_subscribe) mqtt.subscribe(topic_sub) except: print("Unable to connect to MQTT.") index = 0 while True: try: mqtt.check_msg() except: print("Unable to check for messages.") time.sleep(0.1) index += 1 if index >= len(pattern_list[0]): index = 0 for n in range(4): pattern = pattern_list[n][0 + index : 1 + index] led_list[n].value(int(pattern)) if index % 20 == 0: print("pattern_list: {}".format(pattern_list))
MQTT処理は、umqtt.simpleを、そのまま使用させていただいています。
umqtt.simple — MQTT client function
詳細は、https://dev.classmethod.jp/articles/cloudwatch-alarm-device-with-esp32-micropython/をご参照ください
4 Slack
Slackに投稿する内容は、以下のようなイメージになります。
- Whiteだけが、0.5秒単位で点滅(その他は消灯)
white:1111100000
- WhiteとBlueが、0.5秒単位で交互に点滅
white:1111100000 blue:0000011111
- ちょっと複雑なパターン
white : 000000000000000000000000001010100000 red : 0000110011000011000000001100000000 blue : 1010001000101000101010100000000000 yellow : 000000000000000000000000001010100000
投稿内容を処理するために作成したSlackアプリのマニフェストは以下のとおりです。
display_information: name: christmas-tree features: bot_user: display_name: christmas-tree always_online: false oauth_config: scopes: user: - groups:history bot: - incoming-webhook settings: event_subscriptions: request_url: https://xxxxxxxxxxxxxxxxxxxzzaf.lambda-url.ap-northeast-1.on.aws/ user_events: - message.groups org_deploy_enabled: false socket_mode_enabled: false token_rotation_enabled: false
投稿内容をLambdaに送信するために、Event Subscriptions は、API Gateway無しで、Lambdaの関数URLに直接接続されています。LambdaからのSlackへの書き込みは、Incoming Webhooks を使用しています。
投稿を受け取ったLambdaで、MQTTへPublishしているコードは、以下のとおりです。
lambda_function.py
import json import requests # requests を使用するためにLayerが必要 import boto3 url = "https://hooks.slack.com/services/xxxxxxxx/xxxxxxxx/xxxxxxxxxxxxDERSV1MxZ" respone_message = "設定しました" def text_to_pattern(text): lines = text.split("\n") pattern = {"white": "", "red": "", "yellow": "", "blue": ""} for line in lines: tmp = line.split(":") pattern[tmp[0].strip()] = tmp[1].strip() return pattern def send_pattern_to_mqtt(pattern): topic = "christmas-tree/0001" iot = boto3.client("iot-data") payload = pattern iot.publish(topic=topic, qos=1, payload=json.dumps(payload, ensure_ascii=False)) def send_response_to_slack(pattern): data = {"text": "{}\n{}".format(respone_message, json.dumps(pattern))} response = requests.post(url, data=json.dumps(data)) print("slack response: {} {}".format(response.status_code, response.text)) def job(text): pattern = text_to_pattern(text) send_response_to_slack(pattern) send_pattern_to_mqtt(pattern) def lambda_handler(event, context): # Enable Eventsの認証時に使用 # return { # 'statusCode': 200, # 'body': json.dumps(event["body"]) # } # Lambdaが投稿したrespone_messageが含まれていないものを処理対象としている if not respone_message in event: if "body" in event: body = json.loads(event["body"]) if "event" in body: event = body["event"] if "text" in event: job(event["text"])
5 最後に
完全に個人的な趣味の記録で、ほんと申し訳ありません。 ここまで、読んで下さった方に感謝です。